package drds.plus.executor.cursor.cursor.impl.sort;

import drds.plus.common.properties.ConnectionProperties;
import drds.plus.executor.ExecuteContext;
import drds.plus.executor.cursor.cursor.ISortingCursor;
import drds.plus.executor.cursor.cursor.ITemporaryTableSortCursor;
import drds.plus.executor.cursor.cursor_factory.CursorFactory;
import drds.plus.executor.cursor.cursor_metadata.CursorMetaData;
import drds.plus.executor.cursor.cursor_metadata.CursorMetaDataImpl;
import drds.plus.executor.record_codec.RecordCodecFactory;
import drds.plus.executor.record_codec.record.KeyValueRecordPair;
import drds.plus.executor.record_codec.record.Record;
import drds.plus.executor.repository.Repository;
import drds.plus.executor.row_values.RowValues;
import drds.plus.executor.row_values.RowValuesWrapper;
import drds.plus.executor.table.ITable;
import drds.plus.executor.utils.Utils;
import drds.plus.sql_process.abstract_syntax_tree.configuration.ColumnMetaData;
import drds.plus.sql_process.abstract_syntax_tree.configuration.IndexMapping;
import drds.plus.sql_process.abstract_syntax_tree.configuration.IndexType;
import drds.plus.sql_process.abstract_syntax_tree.configuration.TableMetaData;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.Item;
import drds.plus.sql_process.abstract_syntax_tree.expression.order_by.OrderBy;
import drds.plus.sql_process.type.Type;
import drds.plus.util.GeneralUtil;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 用于临时表排序，需要依赖bdb
 */
public class TemporaryTableSortCursor extends SortCursor implements ITemporaryTableSortCursor {
    private static final String _temporary_table_ = "_temporary_table_";
    protected static final String identity = "__identity__".toUpperCase();
    protected static AtomicLong seed = new AtomicLong(0);
    private final Repository repository;
    private final long requestId;
    protected long sizeProtection = 102400;
    protected CursorFactory cursorFactory;
    /**
     * 是否阶段超出行数的数据
     */
    boolean cutRows = false;
    private boolean sortedDuplicates;
    private boolean inited = false;
    private ISortingCursor schematicCursor = null;
    private ITable table = null;
    private CursorMetaData cursorMetaData = null;
    private ExecuteContext executeContext = null;

    /**
     * @param cursorFactory
     * @param repository
     * @param cursor
     * @param orderByList      按照何列排序
     * @param sortedDuplicates 是否允许重复
     */
    public TemporaryTableSortCursor(ExecuteContext executeContext, CursorFactory cursorFactory, Repository repository, long requestId, ISortingCursor cursor, List<OrderBy> orderByList, boolean sortedDuplicates) throws RuntimeException {
        super(cursor, orderByList);
        this.sortedDuplicates = sortedDuplicates;
        setCursorFactory(cursorFactory);
        schematicCursor = cursor;
        this.repository = repository;
        this.requestId = requestId;
        this.executeContext = executeContext;
        sizeProtection = GeneralUtil.getExtraCmdLong(executeContext.getExtraCmds(), ConnectionProperties.TEMP_TABLE_MAX_ROWS, 100000);
        cutRows = GeneralUtil.getExtraCmdBoolean(executeContext.getExtraCmds(), ConnectionProperties.TEMP_TABLE_CUT_ROWS, false);
    }

    private void prepare() throws RuntimeException {
        if (!inited) {
            prepare(repository, schematicCursor, orderByList);
            inited = true;
        }
    }

    protected ISortingCursor prepare(Repository repository, ISortingCursor cursor, List<OrderBy> orderByList) throws RuntimeException {
        //用来生成临时表信息的列
        List<ColumnMetaData> keyColumnMetaDataList = new ArrayList<ColumnMetaData>();
        List<ColumnMetaData> valueColumnMetaDataList = new ArrayList<ColumnMetaData>();
        // 用来生成CursorMeta的列
        List<ColumnMetaData> keyCursorMetaDataColumnMetaDataList = new ArrayList<ColumnMetaData>();
        List<ColumnMetaData> valueCursorMetaDataColumnMetaDataList = new ArrayList<ColumnMetaData>();
        //
        RowValues rowData = cursor.next();
        if (rowData == null) {
            return null;
        }
        // 遍历cursor的keyColumn，如果某个列是order by的条件，那么放到temp cursor的key里面，否则放到value里面
        // 希望通过kv中的列来构造meta data，因为底层讲avg(pk)，解释成了count，和 sum
        build(rowData, orderByList, //
                keyColumnMetaDataList, valueColumnMetaDataList, //
                keyCursorMetaDataColumnMetaDataList, valueCursorMetaDataColumnMetaDataList);
        String oldTableName = rowData.getParentCursorMetaData().getColumnMetaDataList().iterator().next().getTableName();
        String tableName;
        synchronized (this.getClass()) {
            seed.addAndGet(1);
            // 因为tempTable里面的表名不是外部所用的表名，而是使用tmp作为表名，所以需要先变成外部表名. 这里非常hack..
            // 因为实际做cursor的表名匹配的时候，使用的是截取法。。取第一个"."之前的作为匹配标志。
            tableName = oldTableName + ".t." + System.currentTimeMillis() + "." + seed + "requestId." + requestId;
        }
        IndexMapping indexMapping = new IndexMapping(tableName, keyColumnMetaDataList, valueColumnMetaDataList, IndexType.btree, true, true);
        TableMetaData tableMetaData = new TableMetaData(tableName, new ArrayList(), indexMapping, null);
        tableMetaData.setTemporaryTable(true);
        tableMetaData.setSortedDuplicates(sortedDuplicates);
        // 增加临时表的判定
        table = repository.getTemporaryTable(tableMetaData);
        Record key = RecordCodecFactory.newRecordCodec(keyColumnMetaDataList).newRecord();
        Record value = null;
        if (valueColumnMetaDataList != null && valueColumnMetaDataList.size() != 0) {
            value = RecordCodecFactory.newRecordCodec(valueColumnMetaDataList).newRecord();
        }
        //
        int identityNumber = 0;
        long size = 0;
        boolean protection = false;
        // 建立临时表，将数据插入BDB临时表中。
        if (rowData != null) {
            do {
                size++;
                if (size > sizeProtection) {
                    protection = true;
                    break;
                }
                for (ColumnMetaData columnMetaData : keyColumnMetaDataList) {
                    String columnName = columnMetaData.getColumnName();
                    if (columnName.contains(_temporary_table_)) {
                        int index = columnName.indexOf(_temporary_table_);
                        columnName = columnName.substring(index + _temporary_table_.length());
                    }
                    Object object = Utils.getValue(rowData, columnMetaData.getTableName(), columnName, null);//表名+列名定位一行真实的数据
                    key.put(columnMetaData.getColumnName(), object);
                }
                if (valueColumnMetaDataList != null && valueColumnMetaDataList.size() != 0) {
                    for (ColumnMetaData columnMetaData : valueColumnMetaDataList) {
                        String columnName = columnMetaData.getColumnName();
                        if (identity.equals(columnName)) {
                            continue;
                        }
                        if (columnName.contains(_temporary_table_)) {
                            int index = columnName.indexOf(_temporary_table_);
                            columnName = columnName.substring(index + _temporary_table_.length());
                        }
                        Object object = Utils.getValue(rowData, columnMetaData.getTableName(), columnName, null);//表名+列名定位一行真实的数据
                        value.put(columnMetaData.getColumnName(), object);
                    }
                }
                // value内加入唯一索引key。
                if (sortedDuplicates) {
                    value.put(identity, identityNumber++);
                }
                table.put(this.executeContext, tableName, indexMapping, key, value);
            } while ((rowData = cursor.next()) != null);
        }
        ExecuteContext executeContext = new ExecuteContext();
        ISortingCursor schematicCursor = table.getCursor(executeContext, tableName, indexMapping);
        // 去除唯一标志
        List<ColumnMetaData> columnMetaDataList = new ArrayList<ColumnMetaData>();
        // 将唯一标志，从返回数据内排除
        columnMetaDataList.addAll(keyCursorMetaDataColumnMetaDataList);
        for (ColumnMetaData columnMetaData : valueCursorMetaDataColumnMetaDataList) {
            if (!identity.equals(columnMetaData.getColumnName())) {
                columnMetaDataList.add(columnMetaData);
            }
        }
        if (!orderByList.get(0).getAsc()) {
            schematicCursor = cursorFactory.reverseOrderCursor(this.executeContext, schematicCursor);//如果首列是降序 则reverseOrderCursor
        }
        List<RuntimeException> RuntimeExceptionList = new ArrayList();
        RuntimeExceptionList = cursor.close(RuntimeExceptionList);
        this.cursor = schematicCursor;
        if (!RuntimeExceptionList.isEmpty()) {
            throw RuntimeExceptionList.get(0);
        }
        if (protection && !cutRows) {
            RuntimeExceptionList = this.close(RuntimeExceptionList);
            if (!RuntimeExceptionList.isEmpty()) {
                throw RuntimeExceptionList.get(0);
            }
            throw new IllegalStateException("temp where size protection , check your chars or enlarge the limination size . ");
        }
        cursorMetaData = CursorMetaDataImpl.buildCursorMetaData(columnMetaDataList);
        return schematicCursor;
    }

    private void build(RowValues rowData, List<OrderBy> orderByList, //
                       List<ColumnMetaData> keyColumnMetaDataList, List<ColumnMetaData> valueColumnMetaDataList, //
                       List<ColumnMetaData> keyCursorMetaDataColumnMetaDataList, List<ColumnMetaData> valueCursorMetaDataColumnMetaDataList) {
        CursorMetaData cursorMetaData = rowData.getParentCursorMetaData();
        List<ColumnMetaData> columnMetaDataList = cursorMetaData.getColumnMetaDataList();
        Set<OrderBy> orderBySet = new HashSet<OrderBy>();
        for (ColumnMetaData columnMetaData : columnMetaDataList) {//
            if (findOrderByByColumnMetaData(orderByList, columnMetaData, //
                    keyColumnMetaDataList, //key值一般用来查询
                    orderBySet)) {//
                if (!keyCursorMetaDataColumnMetaDataList.contains(columnMetaData)) {
                    keyCursorMetaDataColumnMetaDataList.add(columnMetaData);
                }
                continue;
            } else {
                // 列名与order by not ruleCalculate ,放到value里
                ColumnMetaData valueColumnMetaData = new ColumnMetaData(columnMetaData.getTableName(), columnMetaData.getTableName() + "_ANDOR_TABLENAME_" + columnMetaData.getColumnName(), columnMetaData.getAlias(), columnMetaData.getType(), columnMetaData.isNullable());
                if (!valueColumnMetaDataList.contains(valueColumnMetaData)) {
                    valueColumnMetaDataList.add(valueColumnMetaData);
                }
                if (!valueCursorMetaDataColumnMetaDataList.contains(columnMetaData)) {
                    valueCursorMetaDataColumnMetaDataList.add(columnMetaData);
                }
            }
        }
        // 是否针对重复的value进行排序
        if (sortedDuplicates) {// identity
            if (keyColumnMetaDataList.size() < orderByList.size()) {
                for (OrderBy orderBy : orderByList) {//在查询列的基础上 需要其他的order by列对重复的value进行排序依据
                    if (orderBySet.contains(orderBy)) {
                        continue;
                    } else {
                        Item item = orderBy.getItem();
                        ColumnMetaData columnMetaData = new ColumnMetaData(item.getTableName(), item.getTableName() + _temporary_table_ + item.getColumnName(), item.getAlias(), item.getType(), true);
                        keyColumnMetaDataList.add(columnMetaData);
                        keyCursorMetaDataColumnMetaDataList.add(columnMetaData);
                    }
                }
            }
            valueColumnMetaDataList.add(new ColumnMetaData(keyColumnMetaDataList.get(0).getTableName(), identity, null, Type.IntegerType, true));
        }
    }

    private boolean findOrderByByColumnMetaData(List<OrderBy> orderByList, ColumnMetaData columnMetaData, //
                                                List<ColumnMetaData> keyColumnMetaDataList, Set<OrderBy> orderBySet) {
        for (OrderBy orderBy : orderByList) {
            Item item = orderBy.getItem();
            String tableName = item.getTableName();
            tableName = Utils.getTableName(tableName);
            if (columnMetaData != null && Utils.getTableName(columnMetaData.getTableName()).equals(tableName)) {
                if (columnMetaData.getColumnName().equals(item.getColumnName())) {
                    ColumnMetaData newColumnMetaData = new ColumnMetaData(columnMetaData.getTableName(), columnMetaData.getTableName() + _temporary_table_ + columnMetaData.getColumnName(), columnMetaData.getAlias(), columnMetaData.getType(), columnMetaData.isNullable());
                    // 列名与order by Match.放到key里
                    keyColumnMetaDataList.add(newColumnMetaData);
                    orderBySet.add(orderBy);
                    return true;
                }
            }
        }
        return false;
    }

    public CursorFactory getCursorFactory() {
        return cursorFactory;
    }

    public void setCursorFactory(CursorFactory cursorFactory) {
        this.cursorFactory = cursorFactory;

    }

    public RowValues next() throws RuntimeException {
        prepare();
        RowValues rowData = parentCursorNext();
        rowData = wrap(rowData);
        return rowData;
    }

    private RowValues wrap(RowValues rowData) {
        if (rowData != null) {
            rowData = new RowValuesWrapper(cursorMetaData, rowData);
        }
        return rowData;
    }

    public boolean skipTo(Record keyRecord) throws RuntimeException {
        prepare();
        return parentCursorSkipTo(keyRecord);
    }

    public boolean skipTo(KeyValueRecordPair keyKeyValueRecordPair) throws RuntimeException {
        prepare();
        return parentCursorSkipTo(keyKeyValueRecordPair);
    }

    public RowValues current() throws RuntimeException {
        prepare();
        RowValues current = parentCursorCurrent();
        current = wrap(current);
        return current;
    }

    public RowValues first() throws RuntimeException {
        prepare();
        RowValues first = parentCursorFirst();
        first = wrap(first);
        return first;
    }

    public RowValues last() throws RuntimeException {
        prepare();
        RowValues last = parentCursorPrev();
        last = wrap(last);
        return last;
    }

    public RowValues prev() throws RuntimeException {
        prepare();
        RowValues prev = parentCursorPrev();
        prev = wrap(prev);
        return prev;
    }

    public List<RuntimeException> close(List<RuntimeException> RuntimeExceptionList) {
        RuntimeExceptionList = parentCursorClose(RuntimeExceptionList);
        if (table != null) {
            try {
                table.close();
            } catch (RuntimeException e) {
                RuntimeExceptionList.add(e);
            }
        }
        return RuntimeExceptionList;
    }

    public String toString() {
        return toStringWithInden(0);
    }

    public String toStringWithInden(int inden) {
        String tabTittle = GeneralUtil.getTab(inden);
        String tabContent = GeneralUtil.getTab(inden + 1);
        StringBuilder sb = new StringBuilder();

        sb.append(tabTittle).append("TemporaryTableCursor").append("\n");
        GeneralUtil.printAFieldToStringBuilder(sb, "addOrderByItemAndSetNeedBuild", this.orderByList, tabContent);
        if (this.cursor != null) {
            sb.append(this.schematicCursor.toStringWithInden(inden + 1));
        }
        return sb.toString();
    }

    public List<ColumnMetaData> getColumnMetaDataList() throws RuntimeException {
        prepare();
        return this.cursorMetaData.getColumnMetaDataList();
    }
}
